-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Support Arrow IPC Stream Files #18457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support Arrow IPC Stream Files #18457
Conversation
532ca54 to
99ebe62
Compare
| // correct offset which is a lot of duplicate I/O. We're opting to avoid | ||
| // that entirely by only acting on a single partition and reading sequentially. | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is perhaps the weightiest decision in this PR. if we want to repartition a file in the ipc stream format then we need to read from the beginning of the file for each partition, or figure out another way to create the ad-hoc equivalent of the ipc file format footer so we can minimize duplicate reads (likely by reading the entire file all the way through once and then caching the result in memory for the execution plan to use for each partition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue that while this problem is worth solving, doing so is tangent to this change.
I'd like to see this solved, but I see no reason why we couldn't solve this in a follow-on.
Probably worth documenting the practical consequences of leaving it in this state though -- correct me if I'm wrong here, but I think this means that we end up hydrating the entire file into memory for certain operations, right? That's probably not a good long-term state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't imagine this would mean I need to read the entire file into memory and keep it there? In my previous message I meant we would need to read all the record batch and dictionary locations and keep them in memory in much the same way that the arrow file format footer does. So it would mean a single pass through to record all of that and then multiple threads can seek to different parts of the file and process it.
That's my understanding of the effect of this, that it means we can't parallelize queries against this file format.
If you believe that the resulting behavior would be pathological to the extreme then we should absolutely document that. Thoughts on how we can reliably test that it is? Or who might be aware of the implications of this? And where to document it?
| ); | ||
|
|
||
| let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; | ||
| let meta_len = i32::from_le_bytes(meta_len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be possible to (manually) manipulate the file's bytes in such a way that it produces a negative i32 here.
Then below the casting to usize will lead to problems.
What is the reason meta_len to be i32 instead of u32 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe I was referring to the spec: https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
And saw it say <metadata_size: int32> so I defaulted to i32 🤔
Checking for valid i32 (aka non-negative) does sound reasonable for robustness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed up a refactor where I'm checking that it's not negative now, we should be good.
| let statistics = &self.projected_statistics; | ||
| Ok(statistics | ||
| .clone() | ||
| .expect("projected_statistics must be set")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to panic here ? Would it be better to return an Err ?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we need the panic here, I'm mirroring the other Arrow FileSource. I'm happy to change both but I'll need to track down the implications of this tonight. I was trying to minimize changes in behavior since I'm a new contributor.
Best I can tell it was introduced in #14224
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ); | ||
|
|
||
| let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; | ||
| let meta_len = i32::from_le_bytes(meta_len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe I was referring to the spec: https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
And saw it say <metadata_size: int32> so I defaulted to i32 🤔
Checking for valid i32 (aka non-negative) does sound reasonable for robustness
| } | ||
|
|
||
| // Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. | ||
| // See <https://github.com/apache/arrow-rs/issues/5021> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully reviewed this PR, but just curious if you've managed to check if this code has been upstream to arrow-rs by now and we might be able to leverage it's code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into using the various readers available. FileDecoder requires a schema to create the struct which defeats the point entirely, and FileReader requires the passed-in object to support Read + Seek (we're dealing with a stream of bytes here that only does Read). I think I could keep the magic bytes handling here and then use a Cursor over the bytes already read and chain it with the remainder of the stream, passing that into a StreamReader to parse the schema. so, still a little bit of parsing but much less
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the fact that it's an async stream breaks a lot of the things we could do with upstream functions. We need to know how much to read off the stream to use it synchronously which means we need to do some parsing. I've significantly refactored it and like the result better but I'm going to stick with the parsing how it is, more or less.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is basically right. Couple of nits, one question.
| conf: FileScanConfig, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let source = Arc::new(ArrowSource::default()); | ||
| let is_stream_format = if let Some(first_group) = conf.file_groups.first() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth pulling this out into a helper method that's easy to test. Also then this method reads a bit cleaner, with just a is_stream_format() check as opposed to this block of logic which is not directly relevant to creating a physical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed! I opted to go with a positive check for whether it's in the arrow file format. Future steps that perform the actual parsing of the file should catch if it's not in the arrow stream format either.
| "Unexpected end of byte stream for Arrow IPC file".to_string(), | ||
| ))?; | ||
| ) | ||
| .into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Err(...)? is redundant, you really only need either a bare Err(...)? or a return Err(...), but a bare Err(...)? looks funny to me and we still need to convert the ArrowError into a DatafusionError (which ? does for us automatically) and so we end up with return Err(...).into()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err, return Err(err.into()) in this case
| let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = ( | ||
| bytes[preamble_size..preamble_size + 4] | ||
| .try_into() | ||
| .map_err(|err| { | ||
| ArrowError::ParseError(format!( | ||
| "Unable to read IPC message as metadata length: {err:?}" | ||
| )) | ||
| })?, | ||
| preamble_size + 4, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I reading this right that rest_of_bytes_start_index is always just preamble_size + 4?
If that's the case, it may be clearer to do two separate assignments, i,.e.
| let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = ( | |
| bytes[preamble_size..preamble_size + 4] | |
| .try_into() | |
| .map_err(|err| { | |
| ArrowError::ParseError(format!( | |
| "Unable to read IPC message as metadata length: {err:?}" | |
| )) | |
| })?, | |
| preamble_size + 4, | |
| ); | |
| let rest_of_bytes_start_index: usize = preamble_size + 4; | |
| let meta_len: [u8; 4] = bytes[preamble_size..rest_of_bytes_start_index] | |
| .try_into() | |
| .map_err(|err| { | |
| ArrowError::ParseError(format!( | |
| "Unable to read IPC message as metadata length: {err:?}" | |
| )) | |
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in the version I'm pushing up
| // correct offset which is a lot of duplicate I/O. We're opting to avoid | ||
| // that entirely by only acting on a single partition and reading sequentially. | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue that while this problem is worth solving, doing so is tangent to this change.
I'd like to see this solved, but I see no reason why we couldn't solve this in a follow-on.
Probably worth documenting the practical consequences of leaving it in this state though -- correct me if I'm wrong here, but I think this means that we end up hydrating the entire file into memory for certain operations, right? That's probably not a good long-term state.
Which issue does this PR close?
register_arrowor similar #16688.Rationale for this change
Currently Datafusion can only read Arrow files if the're in the File format, not the Stream format. I work with a bunch of Stream format files and wanted native support.
What changes are included in this PR?
To accomplish the above, this PR splits the Arrow datasource into two separate implementations (
ArrowStream*andArrowFile*) with a facade on top to differentiate between the formats at query planning time.Are these changes tested?
Yes, there are end-to-end sqllogictests along with tests for the changes within datasource-arrow.
Are there any user-facing changes?
Technically yes, in that we support a new format now. I'm not sure which documentation would need to be updated?